package com.atguigu.api

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}


/**
 * @description: xxx
 * @time: 2020/6/21 13:10
 * @author: baojinlong
 **/
object RedisSinkTest {
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置并行度
    environment.setParallelism(1)
    val inputStreamFromFile: DataStream[String] = environment.readTextFile("E:/qj_codes/big-data/FlinkTutorial/src/main/resources/sensor.data")

    // 基本转换操作
    val dataStream: DataStream[SensorReading] = inputStreamFromFile
      .map(data => {
        val dataArray: Array[String] = data.split(",")
        SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
      })

    // 设置sink,写入到文件
    val conf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
      .setHost("localhost")
      .setPort(6379)
      .build()
    val redisMapper: RedisMapper[SensorReading] = new RedisMapper[SensorReading] {
      // 定义保存数据到redis的命令 hset table_name key value
      override def getCommandDescription: RedisCommandDescription = {
        new RedisCommandDescription(RedisCommand.HSET, "sensor_tmp")
      }

      override def getKeyFromData(data: SensorReading): String = {
        data.id
      }

      override def getValueFromData(data: SensorReading): String = {
        data.temperature.toString
      }
    }
    // 写入到redis
    dataStream.addSink(new RedisSink(conf, redisMapper))
    environment.execute("sink simple test job")
  }
}
